跳到主要内容

RabbitMQ 的基本概念与配置

什么是 MQ

参考资料 RabbitMQ 官方地址

MQ 是 Message Queue 的缩写,消息队列是应用程序和应用程序之间的通信方法

使用场景

1、任务异步处理 作用和前端异步编程一样,都是为了节省时间,可以把耗时的操作且对同步要求不高的丢到最后执行(当时说的 IO 密集型任务)例如下载图片这种不需要马上返回结果的操作

2、应用程序解耦合 例如 SOA 架构中心化的实现,每个服务或应用程序之间都通过这个来传递消息(从而达到解耦合的目的)

MQ 的两种主要的实现方式 AMQP 和 JMS

AMQP 是一种网络层消息传递协议(就像 HTTP 那样),所以 RabbitMQ 不从 API 层进行限定,而是直接 通过协议来定义交换的数据 只需遵守这个协议的规范就能获取到相应的数据(所以可移植性也更好)

JMS(Java Message Service)是 Java 平台中关于面向消息中间件的 API

支持的模式

simplest(简单模式)、Work queues、Publish/Subscribe(发布/订阅)、Routing(路由)、Topics(通配符)、RPC

MQ 应用场景

异步处理

例如注册时,注册完成需要发送邮件和短信给用户,但是因为这两者对用户注册影响不大,所以可以引入消息队列来统一监听处理

传统串行的方式

传统并行的方式

引入消息队列

image.png

应用解耦

传统的每个模块都是强耦合的(直接调用),例如订单系统调用库存系统的接口 image.png

但是上面这样直接调用模块有个缺点,就是当库存系统失败之后,这个订单就会失败(消息丢失)。所以为了解决这种因为模块失败导致的消息丢失可以使用消息队列来传递这个消息,这样就算模块执行失败,这个消息依然没有丢失(具体看上面的 “工作模型”)

流量削峰

当流量很大时(例如双十一),可以使用这个 MQ 来控制活动人数,超过此阈值的订单直接丢弃

image.png

当超过消息队列长度超过最大值时,则直接抛弃用户的请求或跳转到错误页上去

配置环境

手动安装

因为 RabbitMQ 是基于 erlang 语言开发的,所以需要先安装 erlang 语言环境,再安装 rabbit

注:找不到包可以用

# 只找包名相关
# apt-cache pkgnames <search_term>
# 找到了就可以直接安装了
# sudo apt-get install erlang
# ============因为在 Ubuntu 市场上的软件比官方的版本低了很多,所以官方不建议使用这种方式==================

# 先把 rabbit 库的指纹添加上去
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-key adv --keyserver "hkps://keys.openpgp.org" --recv-keys "0x0A9AF2115F4687BD29803A206B73A36E6026DFCA"

sudo apt-get install apt-transport-https

# 注:deb 不是一个命令,而是说让你把这个源丢到某个地方
# 然后进到 /etc/apt/sources.list.d/ 里面
# 创建一个 bintray.erlang.list 文件(把下面 deb 开头的命令粘贴进去)
deb https://dl.bintray.com/rabbitmq-erlang/debian $distribution $component
deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang-22.x
deb https://dl.bintray.com/rabbitmq-erlang/debian buster erlang-22.x
deb https://dl.bintray.com/rabbitmq-erlang/debian stretch erlang-22.x
deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang

# 更新一下库
sudo apt-get update -y
# 再安装 erlang
sudo apt-get install -y erlang-base \
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

# 监察版本
erl -version

# ===================安装 rabbitMQ =========================
# 导入包传输密钥
wget -O - "https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey" | sudo apt-key add -
# 添加源到 bintray.erlang.list 文件
deb https://dl.bintray.com/rabbitmq/debian bionic main
# 更新一下源
sudo apt-get update -y
# 安装
sudo apt-get install rabbitmq-server


# 安装完成之后启动需要使用 root用户,所以加上个 sudo
sudo rabbitmq-server

# 启动图形界面的插件也需要使用 root 权限(sudo rabbitmq-plugins list 可以查看哪些工具可以用)
sudo rabbitmq-plugins enable rabbitmq_management
# 然后就可以进管理界面了 http://127.0.0.1:15672/
# 输入默认账号: guest 密码: guest
# 可以对插件使用的命令(关闭、启动、List、重置)
disable # Disables one or more plugins
enable # Enables one or more plugins
list # Lists plugins and their state
set # Enables one or more plugins, disables the rest

使用 Docker 安装

# 搜索一下(一般是 https://hub.docker.com/)
docker search rabbit
# 拉取 rabbitmq (带有 “mangement” 的版本 包含web管理页面)
docker pull rabbitmq:3.8-management

# 创建守护式容器
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.8-management

# 用账户 admin 密码 admin 登陆 http://127.0.0.1:15672/ 管理界面

搭建 MQ 集群(单机)

创建一个叫做 docker-compose.yml 的文件

version: '3'
services:
rabbitmq1:
image: rabbitmq:3.8-management
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ./mnesia1:/var/lib/rabbitmq
container_name: rabbitmq1
hostname: rabbitmq1
environment:
# 注意这里 RABBITMQ_NODENAME=rabbitmq1 表示节点名为 rabbitmq1,连接的格式是 节点名@主机
# 所以:下面也需要使用 rabbitmq1@rabbitmq1 才能连接的上,如果不加这个则默认是使用 rabbit@rabbitmq1
- RABBITMQ_NODENAME=rabbitmq1
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
# 注意:因为 RabbitMQ是用Erlang实现的,Erlang Cookie相当于不同节点之间相互通讯的秘钥,Erlang节点通过交换Erlang Cookie获得认证。 RABBITMQ_ERLANG_COOKIE=CURIOAPPLICATION 已经过期了
rabbitmq2:
image: rabbitmq:3.8-management
ports:
- "5673:5672"
volumes:
- ./mnesia2:/var/lib/rabbitmq
container_name: rabbitmq2
hostname: rabbitmq2
environment:
- RABBITMQ_NODENAME=rabbitmq2
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- CLUSTER_WITH=rabbitmq1
- RAM_NODE=true
# 这个 link 参数是将 rabbitmq1 与 rabbitmq2 关联起来(多个容器之间使用这个 link 参数连接)
# 就是执行 rabbitmqctl join_cluster --ram rabbitmq1@rabbitmq1 时用上的
links:
- rabbitmq1
rabbitmq3:
image: rabbitmq:3.8-management
ports:
- "5674:5672"
volumes:
- ./mnesia3:/var/lib/rabbitmq
container_name: rabbitmq3
hostname: rabbitmq3
environment:
- RABBITMQ_NODENAME=rabbitmq3
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- CLUSTER_WITH=rabbitmq1
- RAM_NODE=true
links:
- rabbitmq1
- rabbitmq2

再进入到对应的目录里,把 .erlang.cookie 文件全部都复制到对应的目录使之内容一样(可以通过 cat .erlang.cookie 来检查)

然后进入到各个节点里重启一下服务或连接主节点

docker exec -it myrabbit1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

docker exec -it myrabbit2 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app
exit


docker exec -it myrabbit3 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app
exit

然后访问

http://localhost:15672/

输入账号密码(user:admin、password:admin),就能看到控制台了

搭建 MQ 集群(真实)

基本上和上面的搭建方式大同小异,但是 docker-compose 无需创建三个 rabbitmq 了,每个主机都创建一个就好了

version: '3'

services:
  rabbitmq1:
    image: rabbitmq:3.8.3-management
    container_name: rabbitmq1
    restart: always
    hostname: rabbitmq1
    ports:
      - 4369:4369
      - 5671:5671
      - 25672:25672
      - 15672:15672
      - 5672:5672
    volumes:
      - ./data:/var/lib/rabbitmq
      - ./config/rabbitmq.sh:/etc/rabbitmq/rabbitmq.sh
      - /etc/hosts:/etc/hosts
    environment:
      - RABBITMQ_DEFAULT_USER=root
      - RABBITMQ_DEFAULT_PASS=root
      - RABBITMQ_ERLANG_COOKIE=CURIOAPPLICATION

其它节点只需改下对应的名称就好了

192.168.99.241 rabbitmq1
192.168.99.242 rabbitmq2
192.168.99.243 rabbitmq3

在 hosts 文件里加上对应节点的 ip 以及映射的主机名称(这个作用就像是 windows 的那个 host 文件一样,路径是 /etc/hosts),就可以直接使用对应的主机名来访问了,例如:rabbit@rabbitmq1,其它的加入节点之类的操作和上面的单机部分一样

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq1
rabbitmqctl start_app 

RabbitMQ 中的重要概念

参考资料 RabbitMQ学习(一):RabbitMQ要点简介 参考资料 廖雪峰老师的 集成RabbitMQ

(1)Broker: 经纪人。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的 RabbitMQ Server 当作 Broker。

(2)Exchange: 消息交换机。指定消息按照什么规则路由到哪个队列 Queue。

(3)Queue: 消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。

(4)Binding: 绑定。作用就是将 Exchange 和 Queue 按照某种路由规则绑定起来。

(5)RoutingKey: 路由关键字。Exchange 根据 RoutingKey 进行消息投递。

(6)Vhost: 虚拟主机。一个 Broker 可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组 Exchange、Queue 和 Binding。

(7)Producer: 消息生产者。主要将消息投递到对应的 Exchange上面。一般是独立的程序。

(8)Consumer: 消息消费者。消息的接收者,一般是独立的程序。

(9)Channel: 消息通道,也称信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。

即当 Producer 想要发送消息的时候,它将消息发送给 Exchange,由 Exchange 将消息根据各种规则投递到一个或多个Queue:

                                      ┌───────┐
┌───>│Queue-1│
┌──────────┐ │ └───────┘
┌──>│Exchange-1│───┤
┌──────────┐ │ └──────────┘ │ ┌───────┐
│Producer-1│──┤ ├───>│Queue-2│
└──────────┘ │ ┌──────────┐ │ └───────┘
└──>│Exchange-2│───┤
└──────────┘ │ ┌───────┐
└───>│Queue-3│
└───────┘

而上面 Exchange 就是通过 Binding(路由规则)来绑定 Queue 的

常用的交换机

参考资料 Springboot 整合RabbitMq ,用心看完这一篇就够了 参考资料 RabbitMQ学习(一):RabbitMQ要点简介

常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种(其它的 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机 用到再学习):

Direct Exchange 

直连型交换机,根据消息携带的路由键将消息投递给对应队列。

默认的预先定义exchange名字:空字符串或者 amq.direct

大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。 然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

如图中 RoutingKey 分别是 error、info、warning,其中 error被 Binding(绑定)到 queue1和queue2上,info和 warning被 Binding到 queue2上。

当消息的 RoutingKey是 error,这条消息将被投递到 queue1和 queue2中(相当于消息被复制成两个分别投放到两个 queue中),然后分别被 Consumer1和 Consumer2处理。如果消息的 RoutingKey是 info或者 warning,这条消息只会被投递到 queue2中,然后被 Consumer2处理。

如果消息的 RoutingKey是其他的字符串,这条消息则会被丢弃。

Fanout Exchange

扇型交换机(广播模式),这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

默认的预先定义exchange名字:amq.fanout

如图中,没有 RoutingKey 的限制,只要消息到达 Exchange,都会被投递到 queue1 和 queue2 中,然后被对应的 Consumer处理。

Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。 简单地介绍下规则:

*  (星号) 用来表示一个单词 (必须出现的) #  (井号) 用来表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的,举个小例子

队列 Q1 绑定键为 *.TT.*,队列Q2绑定键为  TT.#

  • 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
  • 如果一条消息携带的路由键为 TT.AA.BB,那么队列Q2将会收到;

实际上 Topic Exchange 就足够完成上面两种交换机了

1 当一个队列的绑定键为 #(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息,即 Fanout Exchange 的功能。

2 当 *# 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

所以一个主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

如图中,假如消息的 RoutingKey 是 American.action.13,这条消息将被投递到 Q1 和 Q2 中。

假如 RoutingKey 是 American.action.13.test(注意:此处是四个词),这条消息将会被丢弃,因为没有 routingkey 与之匹配。

假如 RoutingKey 是 Chinese.action.13,这条消息将被投递到 Q2 和 Q3 中。

假如 RoutingKey 是 Chinese.action.13.test,这条消息只会被投递到 Q3中,# 可以匹配一个或者多个单词,而 * 只能匹配一个词。

channel 信道

概念:信道是生产消费者与 rabbit 通信的渠道,生产者 publish 或是消费者 subscribe 一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?

就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用。

疑问:为什么不建立多个TCP连接呢?原因是rabbit保证性能,系统为每个线程开辟一个TCP是非常消耗性能, 每秒成百上千的建立销毁TCP会严重消耗系统。所以rabbitmq选择建立多个信道(建立在tcp的虚拟连接) 连接到rabbit上。

类似概念:TCP是电缆,信道就是里面的光纤,每个光纤都是独立的,互不影响。

确认机制(ack)

1、发送方确认模式:消息发送到交换器 > 发送完毕 > 消息投递到队列或持久化到磁盘异步回调通知生产者

2、消费者确认机制:消息投递消费者 > ack > 删除该条消息 > 投递下一条

注:收到ACK前,不会把消息再次发送给该消费者,但是会把下一条消息发送给其他消费者

RabbitMQ的使用流程

AMQP 模型中,消息在 producer 中产生,发送到 MQ 的 exchange 上,exchange 根据配置的路由方式投递到相应的 Queue上,Queue又将消息发送给已经在此 Queue上注册的 consumer,消息从 queue到 consumer有 push和 pull两种方式。

消息队列的使用过程大概如下:

(1)客户端连接到消息队列服务器,打开一个channel。

(2)客户端声明一个 exchange,并设置相关属性。

(3)客户端声明一个 queue,并设置相关属性。

(4)客户端使用 routing key,在 exchange 和 queue 之间建立好 Binding 关系。

(5)生产者客户端投递消息到 exchange。

(6)exchange 接收到消息后,就根据消息的 RoutingKey 和已经设置的 binding,进行消息路由(投递),将消息投递到一个或多个队列里。

(7)消费者客户端从对应的队列中获取并处理消息。

Web 管理界面

因为直接上来就看代码学习这个 RabbitMQ 实在太痛苦了,所以这里通过学习它的 Web界面来上手

这里直接转载大佬的博客 RabbitMQ 之 使用Web管理界面认识RabbitMQ

RabbitMQ的管理界面

添加交换器:

将三种类型的交换器都添加一个(注意 amq 开头的交换机是删不了的):

切换到消息队列栏同上:

将 Exchange 与 Queue 绑定

点击 Exchange 的 Name可进入到 Exchange 的详情页面,在里面将 Exchange 与 Queue 进行 Binding

zs.directzs.fanoutzs.topic 与各 Queue 绑定后的结果

direct(点对点)测试

测试 zs.direct(点对点)

点进交换器之后有一栏是 Publish message,我们发送消息就是在这里进行发送的:

填写好之后,点击 Publish message 发送。发送成功后切换到 Queues 查看是否收到消息,很显然 ls 收到了一条消息,点击 ls 进去查看

进来之后,展开 Get message 选项,点击 GetMessage(s) 就能得到消息了

下面是得到的之前发送过来的消息

fanout(广播)测试

测试 zs.fanout(广播)

同上面测试一样,发送一条消息给 ls :

但是查看消息队列时会发现,zs.fanout 下的所有消息队列都接收到了:

但是,在查看ls所收到的消息时,始终都是之前那一条:

由于消息获取来之后并没有给消息队列进行应答,将接收的消息删除。解决办法就是把 Ack Mode 改成第二个选项就可以了。

topic(发布订阅)测试

测试 zs.topic(发布订阅)

消息队列中与 test.news 匹配上的有:ls.newsww.news,所以它们是会多收到一条消息的。

下面是收到的 ls.news 中收到的来自 zs.topic 的消息:

RabbitMQ 的集群

官方文档 clustering

因为引入了 RabbitMQ 这个服务来当中间件,对消息进行分发,所以如果 MQ 挂掉了,就会导致所有的服务都挂断,因此可以部署集群来做个保险,以免出现这种情况

普通集群(主备复制集群)

image.png 这种模式只同步 Exchange 而不同步队列,所以就算切换过来了,数据也一样会丢失,所以这种集群模式一般是用于缓解主节点的并发压力(主节点没有挂的情况下,消费者可以直接通过从节点来取得消息)

镜像集群

image.png

主备模式:主节点提供读写,从节点不提供读写服务,只是负责提供备份服务,备份节点的主要功能是在主节点宕机时,完成自动切换 从 > 主

TODO: 集群用到再更新...